package Processer;

import MsgModel.UserBehavior.UserBehaviorMsg;
import Util.Const;
import Util.MongoUtil;
import Util.Utils;
import com.mongodb.BasicDBObject;
import com.mongodb.DBCollection;
import com.univocity.parsers.common.TextParsingException;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.Function3;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.State;
import org.apache.spark.streaming.StateSpec;
import org.apache.spark.streaming.api.java.JavaMapWithStateDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import scala.Tuple2;

/**
 * Created by Administrator on 2016/12/20 0020.
 */
public class UserProcesser {
    static final Logger logger = Logger.getLogger(UserProcesser.class);

    public static void LoginCount(JavaPairDStream<String, UserBehaviorMsg> loginStream) {

        JavaPairDStream<String, Integer> numStream = loginStream.mapToPair(new PairFunction<Tuple2<String, UserBehaviorMsg>, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(Tuple2<String, UserBehaviorMsg> stringUserBehaviorMsgTuple2) throws Exception {

                String key = Utils.timeStamp2Date(Long.toString(stringUserBehaviorMsgTuple2._2().getTimeStamp()), "yyyy-MM-dd HH:00");
                int num = stringUserBehaviorMsgTuple2._2() != null ? 1 : 0;
                return new Tuple2<>(key, num);
            }
        });
        // Update the cumulative count function
        Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> mappingFunc =
                new Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> call(String key, Optional<Integer> one,
                                                        State<Integer> state) throws TextParsingException {
                        int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
                        Tuple2<String, Integer> output = new Tuple2<>(key, sum);
                        long timeStamp = Utils.timeStamp();
                        state.update(sum);
                        try {
                            DBCollection ubCollection = MongoUtil.instance.getDBCollection(Const.Mongo_DB, Const.USER_LOGINCOUNT_COLLECTION);
                            //search query
                            BasicDBObject searchQuery = new BasicDBObject();
                            searchQuery.append("loginTime", key);
                            //newDocument
                            BasicDBObject newDocument = new BasicDBObject();
                            newDocument.append("$inc", new BasicDBObject().append("count", one.orElse(0)))
                                    .append("$set", new BasicDBObject().append("CreateOrUpdateTime", timeStamp));
                            //update or insert
                            ubCollection.update(searchQuery, newDocument, true, false);
                        } catch (Exception ex) {
                            logger.error("Mongo异常", ex);
                        } finally {
                            return output;
                        }
                    }
                };
        // DStream made of get cumulative counts that get updated in every batch
        JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream =
                numStream.mapWithState(StateSpec.function(mappingFunc));
        //打印信息
        stateDstream.print();

    }
}
